package cn.fzkj.service;

import cn.fzkj.pojo.User;
import cn.fzkj.repository.UserRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;


@Service
public class UserService {

    @Autowired
    private UserRepository repository;

    public Mono<User> findUserByName(String name){
        return repository.findUserByUserName(name).switchIfEmpty(Mono.empty());
    }

    public Flux<User> findUserByAddress(String address){
        return repository.findAllByAddress(address).switchIfEmpty(Flux.empty());
    }

    private static WebClient webClient = WebClient.create("http://localhost:8080");

    public Mono<User> findById(Long id){
//        Mono<String> just = Mono.just("");
//        return webClient.get()
//                .uri("/user/"+ id)
//                .accept(MediaType.APPLICATION_JSON)
//                .acceptCharset(StandardCharsets.UTF_8)
//                .attribute("id", 99)
//                .retrieve()
//                .bodyToMono(User.class);
        return repository.findById(id);
    }

    public static void test(){
        Flux.interval(Duration.ofSeconds(1))
                .takeWhile(item -> item < 10)
                .takeUntil(index -> {
//                    查询redis中的标志
//                    boolean exist = redisCache.exist(key);
//                    return exist;
                    System.out.println("index: " + index);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    if (index == 5){
                        return true;
                    }
                    return false;
                })
                .doOnNext(index -> {
//                    Mono<User> userByUserName = repository.findById(index);
                    System.out.println("---- " + index);
                    if (index == 4) {
                        throw new RuntimeException("出错啦");
                    }
//                    userByUserName.subscribe(south -> System.out.println(south));
                    // 重试的代码
                    // 可以是：发送mq 发送websocket等等
                })
                .subscribeOn(Schedulers.parallel())
                .doOnError(e -> System.out.println("error: " + e))
                .onErrorComplete()
                .doOnComplete(() -> System.out.println("completed."))
                .doFinally((type) -> System.out.println("finally."))
                .subscribe();

        try {
            Thread.sleep(60000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public Mono<User> save(User user) {
        return repository.save(user);
    }

    /**
     * 在弹窗停留的有效期内，每隔10秒往前端推送websocket的查岗消息，直到查岗记录状态为在岗或者超过弹窗有效期
     * @param dto   推送信息
     * @param from  推送人
     * @param planId    查岗计划id
     * @param tos   接收人
     */
//    private void pushRepeat(InspectWebSocketDto dto, Long from, Long planId, List<Long> tos) {
//        Integer retentionPeriod = dto.retention_period(); // 分钟
//        AtomicBoolean last = new AtomicBoolean(false);
//        Flux.interval(Duration.ofSeconds(10))
//                .takeWhile(index -> index < retentionPeriod * 60 / 5)
//                .doOnNext(index -> {
//                    Long logId = dto.logId();
//                    InspectLogResp logResp = logService.findById(logId);
//                    if (InspectStatus.ON.getStatus().equals(logResp.getInspectStatus())
//                            || retentionPeriod * 60 - (index * 10) <= 0) {
//                        last.set(true);
//                    }else {
//                        log.info("值班查岗【{}】无人应答，重复推送", planId);
//                        long retention = retentionPeriod * 60 - (index * 10);
//                        webSocketService.sendMsg(tos, from, WsMessage.Type.INSPECT,
//                                new InspectWebSocketDto(dto.taskId(), dto.planId(), logId, (int) retention));
//                    }
//                })
//                .takeUntil(index -> last.get())
//                .doOnError((err) -> {
//                    log.warn("值班查岗推送出现异常【{}】", err.getMessage());
//                })
//                .onErrorComplete()
//                .subscribeOn(Schedulers.parallel())
//                .subscribe();
//    }
}
